from pyspark.sql.connect.dataframe import DataFrame

from com.zsw.config.SparkBase import SparkBase


class MySQLReader:
    def __init__(self, url, table, properties):
        self.url = url
        self.table = table
        self.properties = properties

    def read(self):
        spark = SparkBase.get_spark()
        return spark.read \
            .jdbc(url=self.url, table=self.table, properties=self.properties)


class MySQLWriter:
    def __init__(self, url, table, properties):
        self.url = url
        self.table = table
        self.properties = properties

    def write(self, df: DataFrame, mode: str = "overwrite"):
        df.write \
            .format("jdbc") \
            .option("url", self.url) \
            .option("dbtable", self.table) \
            .options(**self.properties) \
            .mode(mode) \
            .save()

